package cn.com.sina.rd.dsp.balancer;

import cn.com.sina.rd.dsp.model.Cluster;
import cn.com.sina.rd.dsp.model.Load;
import cn.com.sina.rd.dsp.model.Server;
import cn.com.sina.rd.dsp.tools.MoveOnce;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import net.sf.json.JSONArray;
import net.sf.json.JSONObject;

import java.util.*;

import static cn.com.sina.rd.dsp.balancer.doBalance.*;

/**
 * Created with IntelliJ IDEA.
 * User: xinrong3
 * Date: 14-1-10
 * Time: 下午8:06
 */
public class ShipLoad {
    public static final String MOVE ="MOVE" ;
    public static final String MOVE_ARRAY ="MOVE_ARRAY" ;
    public static final String MOVE_COUNT ="MOVE_COUNT" ;

    private Cluster cluster;
    private Action option=Action.Balance;
    private JSONObject result=new JSONObject();

    private List<MoveOnce> moves= Lists.newArrayList();

    private PriorityQueue<Server> heavyQueue= new PriorityQueue<Server>(11,heavyComparator);
    private PriorityQueue<Server> lightQueue= new PriorityQueue<Server>(11,lightComparator);

    private void setMoves(List<MoveOnce> moves) {
        this.moves = moves;
    }

    public ShipLoad (Cluster cluster){
        this.cluster=cluster;
    }
    //按负载降序排列
    static Comparator<Server> heavyComparator=new Comparator<Server>() {
        @Override
        public int compare(Server o1, Server o2) {
            return o2.getWeight().compareTo(o1.getWeight());
        }
    };

    //按负载升序排列
    static Comparator<Server> lightComparator=new Comparator<Server>() {
        @Override
        public int compare(Server o1, Server o2) {
            return o1.getWeight().compareTo(o2.getWeight());
        }
    };

    //按承载的最大负载成堆 降序
    static Comparator<Server> leaveComparator=new Comparator<Server>() {
        @Override
        public int compare(Server o1, Server o2) {
            return o2.getLastLoad().getWeight().compareTo(o1.getLastLoad().getWeight());
        }
    };


    public  JSONObject makedecision() {

        long average=cluster.getWeight()/cluster.getMaps().size();
        result.put(AVERAGE,average);
        result.put(Cluster.HIGH_GATE,cluster.getHighgate());
        result.put(Cluster.LOW_GATE,cluster.getLowgate());

        doFirstBalance(average);

        if(average<this.cluster.getHighgate()&&computVariance(average)){
            doSecondBalance(average);
        }

        JSONArray array=new JSONArray();
        for(MoveOnce mv:moves){
            JSONObject temp=new JSONObject();
            temp.put("load_name",mv.getName().getName());
            temp.put("from",mv.getFrom().getServerName());
            temp.put("to",mv.getTo().getServerName());

            array.add(temp);
        }
        JSONObject balance=new JSONObject();
        balance.put(MOVE_COUNT,array.size());
        balance.put(MOVE_ARRAY,array);

        result.put(ACTION,option.toString());
        result.put(doBalance.Action.Balance.toString(),balance);
        return result;
    }

    private boolean computVariance(long average) {
        double sum = 0d;
        for(long weight:cluster.getMaps().values()){
            sum+=(weight-average)*(weight-average);
        }
        double result=sum/cluster.getMaps().size()/average/average;
        if(Math.sqrt(result)>0.2d){
            return true;
        }else{
            return false;
        }
    }

    /**
     * make a balance plan, to deminish the overload server
     * @param average  cluster average load
     */
    private void doFirstBalance(long average) {
        JSONArray action=new JSONArray();
        if(average> cluster.getHighgate()){
            action.add( doBalance.Action.Expand.toString());
            option=doBalance.Action.Expand;
            createBalancePlan(true);
        }else if(average< cluster.getLowgate()){
            action.add(doBalance.Action.Shrink.toString());
            Long server2shrink=cluster.getMaps().size()-(cluster.getWeight()/cluster.getLowgate());
            option=doBalance.Action.Shrink;
            createShrinkBalancePlan(server2shrink.intValue());
        }else{
            option=doBalance.Action.Balance;
            createBalancePlan(true);
        }
    }

    /**
     * make a balance plan, to balance the overload server
     * @param highgate  cluster highgate
     */
    private void doSecondBalance(long highgate) {
        if(!result.containsKey(Action.Split.toString())){
            cluster.setHighgate(highgate+highgate/10);
            createBalancePlan(false);
        }
    }


    public List<MoveOnce> getMoves() {
        return moves;
    }

    //move a load from highest server to lowest server
    private  Load moveALoad(Server high, Server low){
        Load toMove=high.getLoadMap().firstKey();
        if(low.getWeight()+toMove.getWeight()>cluster.getHighgate()){
            return null;
        }else {
            high.getLoadMap().remove(toMove);
            low.addLoadMap(toMove);
            return toMove;
        }
    }

//    public void printQueue(){
//        System.out.println(getServerMap(lightQueue));
////        System.out.println(getServerMap(heavyQueue));
////        System.out.println(cluster.getMaps().keySet());
//
//    }

    private  void createShrinkBalancePlan(int servernum) {
        Set<Server> set= cluster.getMaps().keySet();
        for(Server server:set){
            if(server.getWeight()>cluster.getHighgate()){
                heavyQueue.add(server);
            }else {
                lightQueue.add(server);
            }
        }
        JSONArray shrink=new JSONArray();
        List<Server> list=Lists.newArrayList();
        for(int i=0;i<servernum;i++){
            Server tmp=lightQueue.poll();
            list.add(tmp);
            cluster.getMaps().remove(tmp);   //将节点排除
            shrink.add(tmp.getServerName());
        }
        result.put(Action.Shrink.toString(),shrink);
        while(!list.isEmpty()){
            Server from=list.remove(0);
            while (!from.getLoadMap().isEmpty()){
                Server to=lightQueue.poll();
                Load tomove=moveALoad(from,to);
                addServer2Queue(to);
                if(tomove!=null){
                    moves.add(new MoveOnce(tomove,from,to));
                }else{
                    //除非最低水位线设置成最高水位线的50%以上，否则不可能发生。
                    throw new RuntimeException("low gate unreasonable");
                }
            }
        }

    }


    /**
     * firt balance need split,when do the second balance,we just try best
     * @param first
     */
    private  void createBalancePlan(boolean first) {

        heavyQueue.clear();
        lightQueue.clear();

        Set<Server> set= cluster.getMaps().keySet();
        for(Server server:set){
            if(server.getWeight()>cluster.getHighgate()){
                heavyQueue.add(server);
            }else {
                lightQueue.add(server);
            }
        }

        while(!heavyQueue.isEmpty()){

            if(lightQueue.isEmpty()){
                return;
            }

            Server hserver=heavyQueue.poll();
            Server lserver=lightQueue.poll();
            //move load from highest server to lowest server
            Load move=moveALoad(hserver, lserver);

            addServer2Queue(lserver);

            if(move!=null){
                addServer2Queue(hserver);
                moves.add(new MoveOnce(move, hserver, lserver));
            }else{
                long loadWeight=hserver.getLoadMap().firstKey().getWeight();
                if(loadWeight<cluster.getHighgate()){ //still have a chance
                    Stack<MoveOnce> stack=new Stack<MoveOnce>();
                    if(makeSpace( loadWeight,stack)){
                        //若找到空间，操作记录存入移动指令；否则是无效移动，丢弃
                        for (MoveOnce aStack : stack) {
                            moves.add(aStack);
                        }
                        addServer2Queue(hserver);
                    }else {//need to split the load on this server
                        stack.clear();
                        if(first){
                            ServerLoadNeed2Split(hserver);
                        }
                    }
                }else{// need to split the load on this server.
                    if(first){
                        ServerLoadNeed2Split(hserver);
                    }
                }
            }
        }
    }

    /**
     * hserver is removed by heavyQueue
     * @param hserver  server tobe unbanlace
     */
    private void ServerLoadNeed2Split(Server hserver) {


        //add to expand
        if(this.option.equals(Action.Expand)){
            JSONArray expandeServer=new JSONArray();
            if(result.containsKey(Action.Expand.toString())){
                expandeServer=result.getJSONArray(Action.Expand.toString());
            }
            expandeServer.add(hserver.getServerName());
            result.put(Action.Expand,expandeServer);

        } else{
            JSONArray split=new JSONArray();
            if(result.containsKey(Action.Split.toString())){
                split=result.getJSONArray(Action.Split.toString());
            }
            for(Load load:hserver.getLoadMap().keySet()){
                split.add(load.getName());
            }
            result.put(Action.Split.toString(),split);
        }
    }

    //利用lightQueue,在lowServer上腾出 weight 大小的空间
    private  boolean makeSpace( Long weight,Stack<MoveOnce> stack) {
        if( checkTotalSpare()<weight||lightQueue.size()<=1){
            return false;
        }

        Server lowServer=lightQueue.poll();
        move:while(lowServer.getWeight()+weight>cluster.getHighgate()){  //空间仍然不够大
            Server toServer=lightQueue.poll();
            Load load=moveALoad(lowServer,toServer);
            addServer2Queue(toServer);

            if(load!=null){
                stack.push(new MoveOnce(load, lowServer, toServer));
            }else{
                if(lightQueue.isEmpty()){
                    lightQueue.add(lowServer);
                    return false;
                }

                //lightQueue上已无server能够承担lowServer上的最小块，此时通过整理lightQueue上的最小块来腾空间。
                //既然不能移动，则先将lowServer上的最小块交换到其他碎片较多的机器。
                // 要保证碎片大小的总和大于lowServer的最小块。
                Load load_need_2_move=lowServer.getFirstLoad();

                //制作最大块队列来寻找有潜力去接纳lowServer上最小块的server。
                PriorityQueue<Server> tempQueue=new PriorityQueue<Server>(lightQueue.size(),leaveComparator);//复制数据到新队列
                for (Server s : lightQueue) {
                    tempQueue.add(s.clone());
                }

                TreeMap<Server,Long> tempMap=Maps.newTreeMap();


                while(!tempQueue.isEmpty()){
                    Server tempServer =tempQueue.poll();
                    if(tempServer.getLastLoad().getWeight()>load_need_2_move.getWeight()){
                        tempServer.getLoadMap().remove(tempServer.getLastLoad());

                        if(tempServer.getLastLoad()!=null){
                            tempQueue.add(tempServer);
                        }
                    }else{

                        if(tempServer.getWeight()>load_need_2_move.getWeight()){
                            //有足够的碎块可交换lowServer的最小块。
                            tempMap.put(tempServer,tempServer.getWeight());
                            break;
                        }else{
                            //失去交换价值，删除之。
                            tempQueue.remove(tempServer);
                        }
                    }
                }

                if(tempMap.size()==0){
                    //已经腾不出空间
                    lightQueue.add(lowServer);
                    return false;
                }else{
                    //尝试在找到的碎片较多的server上腾出空间,
                    totry:for(Server tempServer:tempMap.keySet()){
                        //可以交换,找到真实的tempServer
                        for (Server s : lightQueue) {
                            if(s.getServerName().equals(tempServer.getServerName())){
                                tempServer=s;
                                lightQueue.remove(s);
                                break;
                            }
                        }
                        //没有可腾空间的server
                        if(lightQueue.isEmpty()){
                            break ;
                        }

                        Stack<Load> loads2dispatch=new Stack<Load>();
                        for(Map.Entry<Load,Long> e:tempServer.getLoadMap().entrySet()){
                            if(e.getKey().getWeight()<load_need_2_move.getWeight()){
                                loads2dispatch.push(e.getKey());
                            }else{
                                break;
                            }
                        }

                        long count=0l;
                        //尝试能否将碎片移到其他服务器，为lowServer的最小块腾出空间。
                        //将lowServer最小块迁移到腾出空间的机器上,将块从大到小依次尝试移动，
                        // 直到挪出lowServer第一块大小的空间为止，若成功，在lightQueue中必然有server可供lowServer搬迁一次。
                        // 若不成功，尝试其他server。
                        while (!loads2dispatch.isEmpty()){
                            Load tempLoad=loads2dispatch.pop();
                            if(lightQueue.peek().getWeight()+tempLoad.getWeight()<cluster.getHighgate()){
                                Server nextLowServer=lightQueue.poll();

                                tempServer.getLoadMap().remove(tempLoad);
                                nextLowServer.addLoadMap(tempLoad);
                                stack.push(new MoveOnce(tempLoad,tempServer,nextLowServer));
                                count+=tempLoad.getWeight();

                                lightQueue.add(nextLowServer);

                                if(count>load_need_2_move.getWeight()){
                                    //整理碎片成功腾出了足够大的空间。
                                    tempServer.addLoadMap(load_need_2_move);
                                    lowServer.getLoadMap().remove(load_need_2_move);
                                    stack.push(new MoveOnce(tempLoad,lowServer,tempServer));
                                    lightQueue.add(tempServer);
                                    continue move;
                                }
                            }
                        }//while
                        lightQueue.add(tempServer); //将tempServer放回队列
                    }//for totry
                    //for 循环尝试寻找未成功
                    lightQueue.add(lowServer);
                    return false;
                }//else
            }
        }
        lightQueue.add(lowServer);
        return true;
    }

    private Long checkTotalSpare() {
        long totalSpareSize=0l;
        for(Server s:lightQueue){
            totalSpareSize+=s.getWeight();
        }
        return totalSpareSize;
    }

    public  void addServer2Queue(Server s){
        if(s.getWeight()>cluster.getHighgate()){
            heavyQueue.add(s);
        }else {
            lightQueue.add(s);
        }
    }

}
